// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements.  See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
= MapReduce API

:javaFile: {javaCodeDir}/MapReduce.java

== Overview

Ignite provides an API for performing simplified MapReduce operations.
The MapReduce pattern is based on the assumption that the task that you
want to execute can be split into multiple jobs (the mapping phase),
with each job executed separately. The results produced by each job are
aggregated into the final results (the reducing phase).

In a distributed system such as Ignite, the jobs are distributed between
the nodes according to the preconfigured link:distributed-computing/load-balancing[load balancing strategy] and the results are aggregated on the node that submitted the task.

The MapReduce pattern is provided by the `ComputeTask` interface.

[NOTE]
====
Use `ComputeTask` only when you need fine-grained control over the
job-to-node mapping, or custom fail-over logic. For all other cases you
should use link:distributed-computing/distributed-computing#executing-an-igniteclosure[simple closures].
====

== Understanding Compute Task Interface

The `ComputeTask` interface provides a way to implement custom map and reduce logic. The interface has three methods: `map(...)`, `result()`, and `reduce()`.

The `map()` method should be implemented to create the compute jobs based on the input parameter and map them to worker nodes. The method receives the collection of cluster nodes on which the task is to be run and the task's input parameter. The method returns a map with jobs as keys and mapped worker nodes as values. The jobs are then sent to the mapped nodes and executed there.

The `result()` method is called after completion of each job and returns an instance of `ComputeJobResultPolicy` indicating how to proceed with the task. The method receives the results of the job and the list of all the job results received so far. The method may return one of the following values:

- `WAIT` - wait for all remaining jobs to complete (if any);
- `REDUCE` - immediately move to the reduce step, discarding all the remaining jobs and results not yet received;
- `FAILOVER` - failover the job to another node (see link:distributed-computing/fault-tolerance[Fault Tolerance]).

The `reduce()` method is called during the reduce step, when all the jobs have completed (or the `result()` method returned the `REDUCE` result policy for a particular job). The method receives a list with all completed results and returns the final result of the computation.

//When you submit a compute task for execution via the `IgniteCompute.execute()` method, ..

== Executing a Compute Task

To execute a compute task, call the `IgniteCompute.execute(...)` method and pass the input parameter for the compute task as the last argument.
////
TODO: should we provide the full example for C#?
////
[tabs]
--
tab:Java[]
[source,java]
----
include::{javaFile}[tags=execute-compute-task;!exclude,indent=0]
----
tab:C#/.NET[]
[source,csharp]
----
include::code-snippets/dotnet/MapReduceApi.cs[tag=mapReduceComputeTask,indent=0]
----
tab:C++[unsupported]
--

You can limit the execution of jobs to a subset of nodes by using a link:distributed-computing/cluster-groups[cluster group].


== Handling Job Failures

If a node crashes or becomes unavailable during a task execution, all jobs scheduled for the node are automatically sent to another available node (due to the built-in failover mechanism). However, if a job throws an exception, you can treat the job as failed and fail it over to another node for re-execution. To do this, return `FAILOVER` in the `result(...)` method:

[source, java]
----
include::{javaFile}[tags=failover,indent=0]
----


== Compute Task Adapters
There are several helper classes that provide most commonly used implementations of the `result(...)` and `map(...)` methods.

* `ComputeTaskAdapter` — This class implements the `result()` method to return the `FAILOVER` policy if a job throws an exception and the `WAIT` policy otherwise. It means that this implementation will wait for all jobs to finish with a result.

* `ComputeTaskSplitAdapter` — This class extends `ComputeTaskAdapter` and implements the `map(...)` method to automatically assign jobs to nodes. It introduces a new `split(...)` method that implements the logic of producing jobs based on the input data.

See link:{githubUrl}/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSplitAdapter.java[ComputeTaskSplitAdapter.java,window=_blank] and link:{githubUrl}/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java[ComputeTaskAdapter.java,window=_blank] for details.

== Distributed Task Session

NOTE: Not available in .NET/C#/C++.

For each task, Ignite creates a distributed session that holds information about the task and is visible to the task itself and to all jobs spawned by it. You can use this session to share attributes between jobs. Attributes can be assigned before or during job execution and become visible to other jobs in the same order in which they were set.

[tabs]
--
tab:Java[]
[source,java]
----
include::{javaFile}[tags=session;!exclude,indent=0]
----
tab:C#/.NET[unsupported]
tab:C++[unsupported]
--


////////////////////////////////////////////////////////////////////////////////
== Streaming Jobs Continuously

TODO
////////////////////////////////////////////////////////////////////////////////

== Compute Task Example
The following example demonstrates a simple character counting application that splits a given string into words and calculates the length of each word in an individual job. The jobs are distributed to all cluster nodes.

[tabs]
--
tab:Java[]
[source,java]
----
include::{javaCodeDir}/ComputeTaskExample.java[tag=compute-task-example,indent=0]
----
tab:C#/.NET[]
[source,csharp]
----

include::code-snippets/dotnet/MapReduceApi.cs[tag=computeTaskExample,indent=0]
----
tab:C++[unsupported]
--

